{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "61ddebdd-eb15-42c9-8211-304522742e4f",
   "metadata": {},
   "source": [
    "# Step 0. Data Preparation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ccc8b87a-3661-42ef-a845-7b82045b3360",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "!pip install faker --quiet\n",
    "!pip install hsfs==3.7.6 --quiet\n",
    "!pip install hopsworks --quiet\n",
    "!pip install httpimport --quiet\n",
    "!pip install ipython-secrets --quiet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0e84ef9a-b1c1-46da-9497-2c41f706b76a",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import httpimport\n",
    "\n",
    "url = \"https://raw.githubusercontent.com/logicalclocks/hopsworks-tutorials/master/integrations/pyspark_streaming/synthetic_data\"\n",
    "\n",
    "synthetic_data = httpimport.load(\"synthetic_data\", url)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d67bf392-d6c5-4664-8e7b-bbc374c0f64c",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "\n",
    "from confluent_kafka import Producer"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "27857e35-b798-4937-a4cb-5709e378ad6f",
   "metadata": {},
   "source": [
    "## Creating Simulated Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3a6f4e12-b051-4c5b-8984-5eefed1ec9ff",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "data_simulator = synthetic_data.synthetic_data()\n",
    "\n",
    "profiles_df, trans_df = data_simulator.create_simulated_transactions()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b93c8a1d-7378-4922-873d-f1475dfebde8",
   "metadata": {},
   "source": [
    "## Connecting to Hopsworks Feature Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "80f0f8ea-655e-447e-b36a-ed5a513b63df",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import hopsworks\n",
    "from ipython_secrets import *\n",
    "KEY = get_secret('HOPSWORKS_API_KEY')\n",
    "project = hopsworks.login(host=\"c.app.hopsworks.ai\", api_key_value=KEY)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "163e4352-09b2-46ee-9a84-eb1ed0136492",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "kafka_api = project.get_kafka_api()\n",
    "\n",
    "fs = project.get_feature_store()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b4d2cbef-68a3-4684-9456-6e2218aa59fc",
   "metadata": {},
   "source": [
    "## Creating Feature Groups"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "23d638d7-91ee-4f10-bb54-3c56a5070be0",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "profile_fg = fs.get_or_create_feature_group(\n",
    "        name=\"profile\",\n",
    "        primary_key=[\"cc_num\"],\n",
    "        partition_key=[\"cc_provider\"],\n",
    "        online_enabled=True,\n",
    "        version=1\n",
    ")\n",
    "\n",
    "profile_fg.insert(profiles_df, overwrite=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8034c83e-3239-4026-bc87-d306d806f352",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "profiles_df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a7f2e11c-9bd2-4500-a0aa-3d5e6ac50173",
   "metadata": {},
   "source": [
    "## Kafka Topic and Schema Creation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e9716cab-1ccc-4ab6-a34c-c5257c5b41df",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# create kafka topic\n",
    "KAFKA_INPUT_TOPIC = \"transactions_topic_\" + str(project.id)\n",
    "SCHEMA_NAME = \"transactions_schema_\" + str(project.id)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "73710de8-225d-411f-a684-3ac89ec262bd",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "schema = {\n",
    "    \"type\": \"record\",\n",
    "    \"name\": KAFKA_INPUT_TOPIC,\n",
    "    \"namespace\": \"io.hops.examples.feldera.example\",\n",
    "    \"fields\": [\n",
    "        {\n",
    "            \"name\": \"tid\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"string\"\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"date_time\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                {\n",
    "                    \"type\": \"string\",\n",
    "                    \"logicalType\": \"timestamp-micros\"\n",
    "                }\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"cc_num\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"string\"\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"category\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"string\"\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"amount\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"double\"\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"latitude\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"double\"\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"longitude\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"double\"\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"city\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"string\"\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"country\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"string\"\n",
    "            ]\n",
    "        },\n",
    "        {\n",
    "            \"name\": \"fraud_label\",\n",
    "            \"type\": [\n",
    "                \"null\",\n",
    "                \"int\"\n",
    "            ]\n",
    "        },\n",
    "    ]\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d1c8bcf6-ea41-49e3-b95d-342cb7a49bff",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "if KAFKA_INPUT_TOPIC not in [topic.name for topic in kafka_api.get_topics()]:\n",
    "    kafka_api.create_schema(KAFKA_INPUT_TOPIC, schema)\n",
    "    kafka_api.create_topic(KAFKA_INPUT_TOPIC, KAFKA_INPUT_TOPIC, 1, replicas=1, partitions=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7430b2f1-b9d0-40ad-80bc-cb6ee191a767",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from hsfs import engine\n",
    "\n",
    "kafka_config = engine.get_instance()._get_kafka_config(fs.id, {})"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "14d01b4a-ec78-48b7-b8ad-5bc5b3875c84",
   "metadata": {},
   "source": [
    "## Sending Data using created Kafka Topic"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f38b59d0-494d-4827-be57-19c5893a905b",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "trans_df = trans_df.rename(columns={\"datetime\": \"date_time\"})\n",
    "\n",
    "trans_df[\"tid\"] = trans_df[\"tid\"].astype(\"string\")\n",
    "trans_df[\"date_time\"] = trans_df[\"date_time\"].astype(\"datetime64[s]\").astype(\"string\")\n",
    "trans_df[\"cc_num\"] = trans_df[\"cc_num\"].astype(\"string\")\n",
    "trans_df[\"category\"] = trans_df[\"category\"].astype(\"string\")\n",
    "trans_df[\"amount\"] = trans_df[\"amount\"].astype(\"double\")\n",
    "trans_df[\"latitude\"] = trans_df[\"latitude\"].astype(\"double\")\n",
    "trans_df[\"longitude\"] = trans_df[\"longitude\"].astype(\"double\")\n",
    "trans_df[\"city\"] = trans_df[\"city\"].astype(\"string\")\n",
    "trans_df[\"country\"] = trans_df[\"country\"].astype(\"string\")\n",
    "trans_df[\"fraud_label\"] = trans_df[\"fraud_label\"].astype(\"int\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6747774e-bac3-4463-a2ea-556e78e65b96",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "producer = Producer(kafka_config)\n",
    "\n",
    "for index, transaction in trans_df.iterrows():\n",
    "    producer.produce(KAFKA_INPUT_TOPIC, transaction.to_json())\n",
    "    \n",
    "    if index % 5000 == 0:\n",
    "        producer.flush()\n",
    "        print(f'Finished sending index {index}')\n",
    "\n",
    "producer.flush()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
